package com.huan.flink.partition.custom;

import com.huan.flink.partition.Student;
import org.apache.flink.api.common.functions.Partitioner;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * 自定义分区：实现 Partitioner 接口
 *
 * @author huan.fu
 * @date 2023/9/23 - 08:25
 */
public class CustomPartitionApplication {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        // 设置并行度为2
        environment.setParallelism(2);

        environment.fromElements(
                        new Student(1L, 1L, "张三"),
                        new Student(2L, 1L, "张三"),
                        new Student(3L, 2L, "张三"),
                        new Student(4L, 2L, "张三"),
                        new Student(5L, 2L, "张三"),
                        new Student(6L, 3L, "张三")
                )
                // 使用自定义分区
                .partitionCustom(new CustomHashPartitioner(), person -> person.getClassId() + "")
                .print();

        environment.execute("broadcast job");
    }


    static class CustomHashPartitioner implements Partitioner<String> {

        /**
         * 计算分区
         *
         * @param key           需要分区的key
         * @param numPartitions 下游分区数
         * @return 分区索引
         */
        @Override
        public int partition(String key, int numPartitions) {
            return key.hashCode() % numPartitions;
        }
    }
}
